package simpledb.tx;

import simpledb.server.SimpleDB;
import simpledb.file.Block;
import simpledb.buffer.*;
import simpledb.tx.recovery.CheckpointRecord;
import simpledb.tx.recovery.RecoveryMgr;
import simpledb.tx.concurrency.ConcurrencyMgr;
import simpledb.tx.concurrency.LockAbortException;
import simpledb.tx.recovery.LogRecord;

/**
 * Provides transaction management for clients,
 * ensuring that all transactions are serializable, recoverable,
 * and in general satisfy the ACID properties.
 *
 * @author Edward Sciore
 */
public class Transaction {
    private static int nextTxNum = 0;
    private static final int END_OF_FILE = -1;
    private RecoveryMgr recoveryMgr;
    private ConcurrencyMgr concurMgr;
    private int txnum;
    private BufferList myBuffers = new BufferList();

    /* The following added to account for Quiescent Checkpointing*/

    private static int activeTransCount = 0;
    private static boolean isQuiescentCheckpointingOn = false;

    /**
     * Creates a new transaction and its associated
     * recovery and concurrency managers.
     * This constructor depends on the file, log, and buffer
     * managers that it gets from the class
     * {@link simpledb.server.SimpleDB}.
     * Those objects are created during system initialization.
     * Thus this constructor cannot be called until either
     * {@link simpledb.server.SimpleDB#init(String)} or
     * {@link simpledb.server.SimpleDB#initFileLogAndBufferMgr(String)} or
     * is called first.
     */
    /* public Transaction() {
       txnum       = nextTxNumber();
       recoveryMgr = new RecoveryMgr(txnum);
       concurMgr   = new ConcurrencyMgr();
    }*/
    public Transaction() { // call the checkpointing function after n transactions.
        txnum = nextTxNumber();
        int n = 60000;
        performCheckPointing(txnum, n);
        incrementActiveTransCount();
        recoveryMgr = new RecoveryMgr(txnum);
        concurMgr = new ConcurrencyMgr();

    }

    public int getTxnum() {
        return txnum;
    }

    private static synchronized void incrementActiveTransCount() {
        // TODO Auto-generated method stub
        activeTransCount++;
    }

    private static synchronized void decrementActiveTransCount() {
        // TODO Auto-generated method stub
        activeTransCount--;
//        System.out.println("Active transaction count = " + activeTransCount);
        if (activeTransCount == 0) {
            Transaction.class.notifyAll();
        }
    }

    private static synchronized void performCheckPointing(int txnum, int n) {
        // TODO Auto-generated method stub

        while (Transaction.isQuiescentCheckpointingOn) {
            try {
            	System.out.println("Transaction #"+txnum+" on hold for Quiescent Checkpointing");
                Transaction.class.wait();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                System.err.println("(A): Interrupted Exception Caught, " +
                        "throwing a LockAbortException");
                e.printStackTrace();
                throw new LockAbortException();

            }
        }

        if (txnum % (n+1) == 0) {
            isQuiescentCheckpointingOn = true;
            System.out.println("Quiescent Checkpointing Started, this thread is now sleeping for 5 seconds");
            try {
				Thread.sleep(5000L);
			} catch (InterruptedException e1) {
				// TODO Auto-generated catch block
				e1.printStackTrace();
			}
            

            while (activeTransCount > 0) {
                try {
                	System.out.println("Transaction #"+txnum+" on hold for Quiescent Checkpointing");
                    Transaction.class.wait();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    System.err.println("(B): Interrupted Exception Caught, " +
                            "throwing a LockAbortException");
                    e.printStackTrace();
                    throw new LockAbortException();
                }
            } // end of activeTransCount > 0

            LogRecord rec = new CheckpointRecord();
            int lsn = rec.writeToLog();
            SimpleDB.logMgr().flush(lsn);

            isQuiescentCheckpointingOn = false;
            System.out.println("Quiescent Checkpointing Finished");
            Transaction.class.notifyAll();
        }

    }

    /**
     * Commits the current transaction.
     * Flushes all modified buffers (and their log records),
     * writes and flushes a commit record to the log,
     * releases all locks, and unpins any pinned buffers.
     */
    public void commit() {
        recoveryMgr.commit();
        concurMgr.release();
        myBuffers.unpinAll();
        decrementActiveTransCount();
    }

    /**
     * Rolls back the current transaction.
     * Undoes any modified values,
     * flushes those buffers,
     * writes and flushes a rollback record to the log,
     * releases all locks, and unpins any pinned buffers.
     */
    public void rollback() {
        recoveryMgr.rollback();
        concurMgr.release();
        myBuffers.unpinAll();
        decrementActiveTransCount();
    }

    public void undo() {
        concurMgr.release();
        myBuffers.unpinAll();
        decrementActiveTransCount();
    }

    /**
     * Flushes all modified buffers.
     * Then goes through the log, rolling back all
     * uncommitted transactions.  Finally,
     * writes a quiescent checkpoint record to the log.
     * This method is called only during system startup,
     * before user transactions begin.
     */
    public void recover() {
        SimpleDB.bufferMgr().flushAll(txnum);
        recoveryMgr.recover();
    }

    /**
     * Pins the specified block.
     * The transaction manages the buffer for the client.
     *
     * @param blk a reference to the disk block
     */
    public void pin(Block blk) {
        myBuffers.pin(blk);
    }

    /**
     * Unpins the specified block.
     * The transaction looks up the buffer pinned to this block,
     * and unpins it.
     *
     * @param blk a reference to the disk block
     */
    public void unpin(Block blk) {
        myBuffers.unpin(blk);
    }


    /**
     * Returns the integer value stored at the
     * specified offset of the specified block.
     * The method first obtains an SLock on the block,
     * then it calls the buffer to retrieve the value.
     *
     * @param blk    a reference to a disk block
     * @param offset the byte offset within the block
     * @return the integer stored at that offset
     */
    public int getInt(Block blk, int offset) {
        concurMgr.sLock(blk);
        Buffer buff = myBuffers.getBuffer(blk);
        return buff.getInt(offset);
    }

    /**
     * Returns the string value stored at the
     * specified offset of the specified block.
     * The method first obtains an SLock on the block,
     * then it calls the buffer to retrieve the value.
     *
     * @param blk    a reference to a disk block
     * @param offset the byte offset within the block
     * @return the string stored at that offset
     */
    public String getString(Block blk, int offset) {
        concurMgr.sLock(blk);
        Buffer buff = myBuffers.getBuffer(blk);
        return buff.getString(offset);
    }

    /**
     * Stores an integer at the specified offset
     * of the specified block.
     * The method first obtains an XLock on the block.
     * It then reads the current value at that offset,
     * puts it into an update log record, and
     * writes that record to the log.
     * Finally, it calls the buffer to store the value,
     * passing in the LSN of the log record and the transaction's id.
     *
     * @param blk    a reference to the disk block
     * @param offset a byte offset within that block
     * @param val    the value to be stored
     */
    public void setInt(Block blk, int offset, int val) {
        try {
            concurMgr.xLock(blk);
        } catch (LockAbortException l) {
//            activeTransCount--;
            this.rollback();
            throw l;
        }
        Buffer buff = myBuffers.getBuffer(blk);
        int lsn = recoveryMgr.setInt(buff, offset, val);
        buff.setInt(offset, val, txnum, lsn);
    }

    /**
     * Stores a string at the specified offset
     * of the specified block.
     * The method first obtains an XLock on the block.
     * It then reads the current value at that offset,
     * puts it into an update log record, and
     * writes that record to the log.
     * Finally, it calls the buffer to store the value,
     * passing in the LSN of the log record and the transaction's id.
     *
     * @param blk    a reference to the disk block
     * @param offset a byte offset within that block
     * @param val    the value to be stored
     */
    public void setString(Block blk, int offset, String val) {
        try {
            concurMgr.xLock(blk);
        } catch (LockAbortException l) {
//            activeTransCount--;
            this.rollback();
            throw l;
        }
        Buffer buff = myBuffers.getBuffer(blk);
        int lsn = recoveryMgr.setString(buff, offset, val);
        buff.setString(offset, val, txnum, lsn);
    }

    /**
     * Returns the number of blocks in the specified file.
     * This method first obtains an SLock on the
     * "end of the file", before asking the file manager
     * to return the file size.
     *
     * @param filename the name of the file
     * @return the number of blocks in the file
     */
    public int size(String filename) {
        Block dummyblk = new Block(filename, END_OF_FILE);
        concurMgr.sLock(dummyblk);
        return SimpleDB.fileMgr().size(filename);
    }

    /**
     * Appends a new block to the end of the specified file
     * and returns a reference to it.
     * This method first obtains an XLock on the
     * "end of the file", before performing the append.
     *
     * @param filename the name of the file
     * @param fmtr     the formatter used to initialize the new page
     * @return a reference to the newly-created disk block
     */
    public Block append(String filename, PageFormatter fmtr) {
        Block dummyblk = new Block(filename, END_OF_FILE);
        concurMgr.xLock(dummyblk);
        Block blk = myBuffers.pinNew(filename, fmtr);
        unpin(blk);
        return blk;
    }

    private static synchronized int nextTxNumber() {
        nextTxNum++;
        return nextTxNum;
    }
}
